{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Load the libraries"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [],
   "source": [
    "import os\n",
    "import numpy as np\n",
    "import pandas as pd\n",
    "from pyspark.sql.types import *\n",
    "from pyspark.ml import Pipeline\n",
    "from pyspark.sql import functions as f\n",
    "from pyspark.sql.functions import udf, StringType\n",
    "from pyspark.sql import SparkSession, functions as F\n",
    "from pyspark.ml.evaluation import MulticlassClassificationEvaluator\n",
    "from pyspark.ml.classification import MultilayerPerceptronClassifier\n",
    "from pyspark.ml.feature import OneHotEncoder, VectorAssembler, StringIndexer"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Initialize Spark Session"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [],
   "source": [
    "spark = SparkSession.builder.appName('pyspark-dl').getOrCreate()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Read the Dataset"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [],
   "source": [
    "web_data = spark.read.csv('data_set.csv', header=True, inferSchema=True)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "root\n",
      " |-- Visit_Number_Bucket: string (nullable = true)\n",
      " |-- Page_Views_Normalized: double (nullable = true)\n",
      " |-- Orders_Normalized: integer (nullable = true)\n",
      " |-- Internal_Search_Successful_Normalized: double (nullable = true)\n",
      " |-- Internal_Search_Null_Normalized: double (nullable = true)\n",
      " |-- Email_Signup_Normalized: double (nullable = true)\n",
      " |-- Total_Seconds_Spent_Normalized: double (nullable = true)\n",
      " |-- Store_Locator_Search_Normalized: double (nullable = true)\n",
      " |-- Mapped_Last_Touch_Channel: string (nullable = true)\n",
      " |-- Mapped_Mobile_Device_Type: string (nullable = true)\n",
      " |-- Mapped_Browser_Type: string (nullable = true)\n",
      " |-- Mapped_Entry_Pages: string (nullable = true)\n",
      " |-- Mapped_Site_Section: string (nullable = true)\n",
      " |-- Mapped_Promo_Code: string (nullable = true)\n",
      " |-- Maped_Product_Name: string (nullable = true)\n",
      " |-- Mapped_Search_Term: string (nullable = true)\n",
      " |-- Mapped_Product_Collection: string (nullable = true)\n",
      "\n"
     ]
    }
   ],
   "source": [
    "web_data.printSchema()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Rename Target Column"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [],
   "source": [
    "web_data_renamed = web_data.withColumnRenamed('Orders_Normalized', 'label')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "root\n",
      " |-- Visit_Number_Bucket: string (nullable = true)\n",
      " |-- Page_Views_Normalized: double (nullable = true)\n",
      " |-- label: integer (nullable = true)\n",
      " |-- Internal_Search_Successful_Normalized: double (nullable = true)\n",
      " |-- Internal_Search_Null_Normalized: double (nullable = true)\n",
      " |-- Email_Signup_Normalized: double (nullable = true)\n",
      " |-- Total_Seconds_Spent_Normalized: double (nullable = true)\n",
      " |-- Store_Locator_Search_Normalized: double (nullable = true)\n",
      " |-- Mapped_Last_Touch_Channel: string (nullable = true)\n",
      " |-- Mapped_Mobile_Device_Type: string (nullable = true)\n",
      " |-- Mapped_Browser_Type: string (nullable = true)\n",
      " |-- Mapped_Entry_Pages: string (nullable = true)\n",
      " |-- Mapped_Site_Section: string (nullable = true)\n",
      " |-- Mapped_Promo_Code: string (nullable = true)\n",
      " |-- Maped_Product_Name: string (nullable = true)\n",
      " |-- Mapped_Search_Term: string (nullable = true)\n",
      " |-- Mapped_Product_Collection: string (nullable = true)\n",
      "\n"
     ]
    }
   ],
   "source": [
    "web_data_renamed.printSchema()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Split the dataset into Train, Validation and Test"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [],
   "source": [
    "train, validation, test  = web_data_renamed.randomSplit([0.7, 0.2, 0.1], 1234)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Build Pipeline"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [],
   "source": [
    "categorical_columns = [item[0] for item in web_data_renamed.dtypes if item[1].startswith('string')]\n",
    "numeric_columns = [item[0] for item in web_data_renamed.dtypes if item[1].startswith('double')]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [],
   "source": [
    "indexers = [StringIndexer(inputCol=column, outputCol='{0}_index'.format(column)) for column in categorical_columns]\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {},
   "outputs": [],
   "source": [
    "featuresCreator = VectorAssembler(inputCols=[indexer.getOutputCol() for indexer in indexers] + numeric_columns, outputCol=\"features\")\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {},
   "outputs": [],
   "source": [
    "layers = [len(featuresCreator.getInputCols()), 4, 2, 2]\n",
    "\n",
    "classifier = MultilayerPerceptronClassifier(labelCol='label', featuresCol='features', maxIter=100, layers=layers, blockSize=128, seed=1234)\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {},
   "outputs": [],
   "source": [
    "pipeline = Pipeline(stages=indexers + [featuresCreator, classifier])"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Fit Pipeline"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "metadata": {},
   "outputs": [],
   "source": [
    "model = pipeline.fit(train)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Get Pipeline Output"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "metadata": {},
   "outputs": [],
   "source": [
    "train_output_df = model.transform(train)\n",
    "validation_output_df = model.transform(validation)\n",
    "test_output_df = model.transform(test)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Evaluate the Predictions"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 15,
   "metadata": {
    "scrolled": true
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Train weightedPrecision = 0.976101874447846\n",
      "Validation weightedPrecision = 0.9765821626938243\n",
      "Test weightedPrecision = 0.9747324280445043\n",
      "Train weightedRecall = 0.9755751041220662\n",
      "Validation weightedRecall = 0.9761613691931541\n",
      "Test weightedRecall = 0.9742582305920606\n",
      "Train accuracy = 0.975575104122066\n",
      "Validation accuracy = 0.976161369193154\n",
      "Test accuracy = 0.9742582305920607\n"
     ]
    }
   ],
   "source": [
    "train_predictionAndLabels = train_output_df.select(\"prediction\", \"label\")\n",
    "validation_predictionAndLabels = validation_output_df.select(\"prediction\", \"label\")\n",
    "test_predictionAndLabels = test_output_df.select(\"prediction\", \"label\")\n",
    "\n",
    "metrics = ['weightedPrecision', 'weightedRecall', 'accuracy']\n",
    "\n",
    "for metric in metrics:\n",
    "    evaluator = MulticlassClassificationEvaluator(metricName=metric)\n",
    "    print('Train ' + metric + ' = ' + str(evaluator.evaluate(train_predictionAndLabels)))\n",
    "    print('Validation ' + metric + ' = ' + str(evaluator.evaluate(validation_predictionAndLabels)))\n",
    "    print('Test ' + metric + ' = ' + str(evaluator.evaluate(test_predictionAndLabels)))"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.3"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
